package cloud.xiguapi.ubas.analysis.advertising.model;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

/**
 * @author 大大大西西瓜皮🍉
 * date: 2021-5-19 下午 06:03
 * desc:
 */
public class ProvinceAdvertisingCountResult
        implements WindowFunction<Long, ProvinceAdvertisingCountView, String, TimeWindow> {

    @Override
    public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<ProvinceAdvertisingCountView> out) throws Exception {
        // 收集数据
        out.collect(ProvinceAdvertisingCountView.builder()
                .province(province)
                .windowEnd(new Timestamp(window.getEnd()).toString())
                .count(input.iterator().next())
                .build());
    }
}